Flink 1.14: Fix the flaky testHashDistributeMode by ingesting all rows in one checkpoint cycle.#4189
Conversation
…in one checkpoint precisely.
|
Run this 20 times in my host, everything seems OK: |
| List<Row> dataSet = ImmutableList.of( | ||
| Row.of(1, "aaa"), Row.of(1, "bbb"), Row.of(1, "ccc"), | ||
| Row.of(2, "aaa"), Row.of(2, "bbb"), Row.of(2, "ccc"), | ||
| Row.of(3, "aaa"), Row.of(3, "bbb"), Row.of(3, "ccc")); | ||
| String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet)); |
There was a problem hiding this comment.
shall we produce more than one checkpoint? and add enough records in each part instead of enumerating them?
There was a problem hiding this comment.
I think a single checkpoint is good enough to validate the PartitionKeySelector. More checkpoints will make the unit test more complex but validate the same thing in my mind.
Mocking more records as the testing data set looks good to me.
| sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE); | ||
|
|
||
| Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); | ||
| SimpleDataUtil.assertTableRecords(table, ImmutableList.of( |
There was a problem hiding this comment.
check records based on dataSet?
|
@openinx I have run the test hundreds of times locally in a test loop like you did before and was never able to reproduce it. Think again about the root cause that we discussed in the issue where we may miss the notifyCheckpointComplete callback. As a result, two checkpoint cycles got squashed into one Iceberg commit and hence have 2 files for a partition in one Iceberg commit. I misunderstood the PR earlier. Looks like the change is to make sure we have one checkpoint cycle for all rows to bypass the potential problem from multiple checkpoint cycles. |
stevenzwu
left a comment
There was a problem hiding this comment.
LGTM.
nit: can we change the description to "by ingesting all rows in one checkpoint cycle"? Earlier, I misunderstood the PR. I mistakenly thought we are still doing multiple checkpoint cycles and we are just precisely control rows in each checkpoint cycle.
|
@stevenzwu The root cause is : Previous design could not guarantee that a single checkpoint could commit all rows to a given transaction. Here is another example. That's why we are now trying to guarantee this in this PR. The new description looks good to me if you think it's more clear. |
|
@openinx looks good. can you merge this? should be safe. |
|
Thanks for fixing the flaky test, @openinx! |
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
… one checkpoint cycle (apache#4189)
This PR is trying to fix the flaky testHashDistributeMode unit test fundamentally. The following are the explannation about the current fix.